对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》
写在前面
- 工作中遇到,有大佬做了解答,简单整理
- 阻塞的主要原因是
网络IO 密集型
和 CPU 密集型
是两个不同的概念, ASGI
更多的是面向 网络/IO 密集型的非阻塞处理,不适用 CPU 密集型
- 理解不足小伙伴帮忙指正
对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》
在使用 FastAPI
做 web
服务的时候, 使用 BackgroundTasks
执行CPU密集型或者 IO 密集型任务,会阻塞当前 web 服务的所有接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @app.get('/face_recognition_activated') async def face_recognition_activated(level : int,background_tasks: BackgroundTasks,token: bool = Depends(get_current_token)): """ @Time : 2023/10/20 03:28:11 @Author : liruilonger@gmail.com @Version : 1.0 @Desc : 开始进行人脸识别 """ background_tasks.add_task(face_recognition, data = { "dir_name":"A205_20237test4", "class_code": "A0205", "real_real_id": [2747,2745,345435] }) return {"status": 200,"message": "人脸识别开始了 🚀🚀🚀🚀🚀" }
|
对应的后台任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| async def face_recognition(c_data): """ @Time : 2023/10/20 05:09:23 @Author : liruilonger@gmail.com @Version : 1.0 @Desc : 人脸识别 """ logging.info("开始人脸数据库特征提取!") .................... pbar_i = tqdm( range(0, len(face_list)), desc="特征载入内存:👀👀👀", mininterval=0.001, smoothing=0.0001, colour='green', ) for idx in pbar_i: face_id = face_list[idx] face_id = face_id.decode('utf-8') face_score_key = class_code_account_period +"_face_score" faces = pkl.rc.zrange(face_score_key + ":" + face_id, 0, -1) meta_dates = [] for hallmark_id in faces: face_recognition_key = class_code_account_period + "_face_recognition" face_r = pkl.rc.hget( face_recognition_key + ":" + face_id, hallmark_id) face_path_key = class_code_account_period + ':Path' f_path = pkl.rc.hget(face_path_key, face_id) meta_date = {"hallmark_id": hallmark_id.decode('utf-8'), "face_r": pickle.loads(face_r), "f_path": f_path.decode('utf-8'), "mass": 500, } meta_dates.append(meta_date) checks.append({"face_id": face_id, "meta_dates": meta_dates})
logging.info("构建内存人脸库完成") logging.info("开始人脸识别..") r_p = RedisClient(1) logging.info("人脸识别后台任务启动......") consumer_task = asyncio.create_task(AdafaceFaceIdentification.consumer_facial_feature_clustering(global_object,checks,r_p,class_code_account_period,c_data)) await asyncio.gather(consumer_task)
|
对于这种情况,这是因为 对应的 后台任务被定义为 async
, 意味着 fastapi
会在 asyncio
事件循环中运行它。并且因为 对应后台任务的某一环节是同步的(即不等待某些 IO
或者是网络请求,而是进行计算)只要它正在运行,它就会阻塞事件循环。
这有在涉及异步IO
和网络操作
的情况下,asyncio
才不会阻塞,能够以非阻塞的方式运行,从而充分利用系统资源并提高应用程序的并发性能。
解决这个问题的几种方法:
- 使用更多的工人(例如
uvicorn main:app --workers 4
)。这将允许最多 4 个 后台任务 并行。
- 将任务重写为不是
async
(即将其定义为 def task(data)
: … 等)。然后 starlette 将在单独的线程中运行它。
- 使用
fastapi.concurrency.run_in_threadpool
,这也将在单独的线程中运行它。像这样:
1 2 3 4 5
| from fastapi.concurrency import run_in_threadpool async def task(data): otherdata = await db.fetch("some sql") newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata)) await db.execute("some sql", newdata)
|
- 或者直接使用
asyncios
的 run_in_executor
(其中 run_in_threadpool
在后台使用):
1 2 3 4 5 6 7
| import asyncio async def task(data): otherdata = await db.fetch("some sql") loop = asyncio.get_running_loop() newdata = await loop.run_in_executor(None, lambda: somelongcomputation(data, otherdata)) await db.execute("some sql", newdata)
|
- 自己生成一个单独的线程/进程。例如使用
concurrent.futures
- 使用更重的东西,如芹菜。 (也在 此处 的 fastapi 文档中提到)。
博文部分内容参考
© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知 :)
https://segmentfault.com/q/1010000043296883
https://stackoverflow.com/questions/63169865/how-to-do-multiprocessing-in-fastapi/63171013#63171013
© 2018-至今 liruilonger@gmail.com, All rights reserved. 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)